cdh里面怎么修改kafka的配置文件 | 您所在的位置:网站首页 › kafka properties配置 › cdh里面怎么修改kafka的配置文件 |
Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中Spark Streaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。 然而,当下越来越多的安全漏洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本文将围绕Kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,最后分享下Transwarp在Kafka安全性上所做的工作及其使用方法。 Kafka架构与安全 首先,我们来了解下有关Kafka的几个基本概念: Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的Topic Name标识。 Producer:向Topic发布消息的进程称为Producer。 Consumer:从Topic订阅消息的进程称为Consumer。 Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。 Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Producer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Producer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录Consumer的消费消息的偏移量,以及在Consumer Group发生变化时进行relalance. Broker接收和发送消息是被动的:由Producer主动发送消息,Consumer主动拉取消息。 然而,分析Kafka框架,我们会发现以下严重的安全问题: 1.网络中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Producer的消息,能够篡改消息并发送给Consumer。 2.网络中的任何一台主机,都可以启动恶意的Producer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。 3.Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。 4.Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Producer)都能对任意Topic读取(或发送)消息。 随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸弹,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。 Kafka安全设计 基于上述分析,Transwarp从以下两个方面增强Kafka的安全性: 身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。 权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。 基于Kerberos的身份机制如下图所示: Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。 Producer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接: 1.Producer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出 2.Producer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Producer返回SessionKey(会话密钥)和ServiceTicket(服务票证) 3.Producer使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与Producer通信的SessionKey,然后使用SessionKey验证Producer的身份,通过则建立连接,否则拒绝连接。 ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。 Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl/topic/user,节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为RW,则表示用户jack能够对transaction这个topic进行读和写。 另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。 构建安全的Kafka服务 首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/server.properties,安全相关的参数如下所示: 其中,authentication参数表示认证模式,可选配置项为simple, kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab. 认证模式为ipaddress时,Producer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示: public class SecureProducer extends Thread { private final kafka.javaapi.producer.ProducerInteger, Stringproducer private final String topic private final Properties props = new Properties() public SecureProducer(String topic) { AuthenticationManager.setAuthMethod(“kerberos”) AuthenticationManager.login(“producer1″, “/etc/producer1.keytab”) props.put(“serializer.class”, “kafka.serializer.StringEncoder”) props.put(“metadata.broker.list”, “172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092″) // Use random partitioner. Don’t need the key type. Just set it to Integer. // The message is of type String. producer = new kafka.javaapi.producer.ProducerInteger, String( new ProducerConfig(props)) this.topic = topic 一.打包项目镜像:
利用Dockerfile 来打包项目的镜像 本次项目共依赖两个镜像(一个基础系统环境和一个项目镜像) 本次直接将Dockerfile写好后,用shell脚本build.sh启动打包:
然后切换到项目的目录下找到build.sh,运行即可打包项目镜像
若
报错:"failed to dial gRPC: cannot connect to the Docker daemon. Is 'docker daemon' running on this host?: dial unix /var/run/docker.sock: connect: permission denied " 就用
出现以下说明打包成功,接下来可以开始部署:
https://jingyan.baidu.com/article/9113f81b49ed2f2b3214c7fa.html
注意:如果遇到只读权限不能修改时,将host文件复制一份到桌面,修改后在替换原来的host文件 在hosts文件末尾加上kafka服务器!外网! 39. 0.25...地址,修改后的格式如下: 1.1注意: 修改阿里云服务器的hosts 文件来配置 kafka的服务器地址:
在hosts 文件最后加入:
添加的 kafka-server 就是以下创建topic命令中的 kafka-server别名,
监听远程kafka:新建消费者:
远程创建topic的实例:
查看远程已创建的topc:
本地:
远程修改后的kafka topic:
2.通过git Bash 切换到kafka客户端的bin目录: 桌面打开 gitBash,切换到本地kafka软件目录:
这里一定要切换为windows
3.查看已经有的topic --topic 指定topic名字 --replication-factor 指定副本数,因为我的是集群环境,这里副本数就为3 --partitions 指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好
注意:服务器部署时候一定要用内网172. .开头的,外部访问设为外网ip 不然会导致Kafka写入数据的时候报错 : TImeout
4.1本地docker创建topic:
4.2 本地windows 创建topic 进入本地软件路径KAFKA/BIN/WIONDOWS 创建topic
5.修改服务器的host: 一定要注意加sudo 不然会导致readonly 无法修改
在host 文件的末尾加上以下:
6.切换到工程部署的目录
7.清理redis,不然数据有残留: 7.1服务器上的redis挂载清除: 在 docker-compose.yml中注销这几行: 目的是每次启动不必记录上次没有执行完的数据.
这个是用来记录redis中假如上次指定的是1到100万块,没有执行完.下次接着执行没执行完的任务,测试时暂时关闭
7.2删除volume:
7.3 如果volume文件被占用时,先删除占用容器:
7.4 清除redis中的数据 进入redis容器中:
8.部署命令: 8.1开启docker可视化web上监控docker:
然后访问: http://39.100.48.41:9000 宿主机IP + 9000端口
8.2执行部署命令,启动服务:
9.部署时报错: yaml: line 46: did not find expected key 原因: docker-compose.yml文件中第46行 报错
解决:将所有数据对齐,不要有多余的空格. Apache官网 http://kafka.apache.org/downloads.html
tar -xzf kafka_2.10-0.8.2.2.tgz cd kafka_2.10-0.8.2.2
KAFKA需要启动两个服务: 1,启动zookeeper: bin/zookeeper-server-start.sh config/zookeeper.properties (能在执行指令时退出操作) 2,启动kafka: bin/kafka-server-start.sh config/server.properties
1,创建topic bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test 创建一个名为test的topic,只有一个副本,一个分区。 2,创建生产者,启动producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test topic要对应之前创建的topic 之后便可以发送消息 3,创建消费者,启动consumer bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning topic要对应之前创建的topic 在生产者发送消息后,消费者便可以收到对应的消息了
http://blog.csdn.net/suifeng3051/article/details/48053965 欢迎分享,转载请注明来源:内存溢出 原文地址:https://outofmemory.cn/bake/8000437.html |
CopyRight 2018-2019 实验室设备网 版权所有 |